package com.kaigejava.flink.chapter01;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author 凯哥Java
 * @description 无界的流处理。需要监听
 * @company
 * @since 2022/11/21 16:50
 */
public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        //创建流执行环境
        StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();
        // nc -lk 7777
        DataStreamSource<String> lineDSS = evn.socketTextStream("192.168.50.137",7777);
        //3: 进行转换
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne =lineDSS .flatMap((String line, Collector<Tuple2<String,Long>> out)-> {
            String [] words = line.split(" ");
            //获取每一个words,转换成二元组
            for (String word : words) {
                out.collect(Tuple2.of(word,1L));
            }

        }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        //4:分组
        KeyedStream<Tuple2<String,Long>,String> wordAndOneKs =  wordAndOne.keyBy(t -> t.f0);
        //5：求和
        SingleOutputStreamOperator<Tuple2<String,Long>> result = wordAndOneKs.sum(1);
        //6:打印
        result.print();
        // 7: 启动==>执行
        evn.execute();
    }

    private static int test(int i) {
        try {
            i++;
            return i;
        } finally {
          i++;
          return i;
        }
    }
}
